#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import signal
import time
import unittest
from multiprocessing import Process

import numpy as np
from dist_test_utils import remove_ps_flag
from op import Operator

from paddle import base
from paddle.base import core
from paddle.base.framework import Program, program_guard
from paddle.incubate.distributed.fleet.parameter_server.mode import (
    DistributedMode,
)


def run_pserver(pserver_id):
    remove_ps_flag(os.getpid())
    scope = base.core.Scope()
    program = Program()
    with (
        base.scope_guard(scope),
        program_guard(program, startup_program=Program()),
    ):
        # create table parameter in scope
        place = base.CPUPlace()
        # create and initialize Param Variable
        param = scope.var('table').get_tensor()

        param_array = np.ones((5, 8)).astype("float32")
        for i in range(len(param_array)):
            param_array[i] *= param_array[i] * i + pserver_id * 10 + 1
        param.set(param_array, place)

        optimize_block = program._create_block(program.global_block().idx)
        program.global_block().append_op(
            type="listen_and_serv",
            inputs={'X': []},
            outputs={},
            attrs={
                "optimize_blocks": [optimize_block],
                "endpoint": '127.0.0.1:0',
                "Fanin": 1,
                "distributed_mode": DistributedMode.SYNC,
                "grad_to_block_id": [],
            },
        )

        exe = base.Executor(place)
        exe.run(program)


@unittest.skip("do not need currently")
class TestListenAndServOp(unittest.TestCase):
    def setUp(self):
        self.ps_timeout = 5

    def _start_pserver(self, pserver_id, pserver_func):
        p = Process(target=pserver_func, args=(pserver_id,))
        p.daemon = True
        p.start()
        return p

    def _wait_ps_ready(self, pid):
        start_left_time = self.ps_timeout
        sleep_time = 0.5
        while True:
            assert start_left_time >= 0, "wait ps ready failed"
            time.sleep(sleep_time)
            try:
                # the listen_and_serv_op would touch a file which contains the listen port
                # on the /tmp directory until it was ready to process all the RPC call.
                os.stat(f"/tmp/paddle.{pid}.port")
                return
            except OSError:
                start_left_time -= sleep_time

    def _get_pserver_port(self, pid):
        with open(f"/tmp/paddle.{pid}.port", 'r') as f:
            port = int(f.read().strip())
        return port

    def _run_nce_op_two_pserver(self, place, port0, port1, model_file):
        scope = base.core.Scope()
        program = Program()
        with (
            base.scope_guard(scope),
            program_guard(program, startup_program=Program()),
        ):
            emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)]

            # create and run recv and save operator
            remote_recv_op = Operator(
                "recv_save",
                trainer_id=0,
                shape=[10, 8],
                slice_shapes=["5,8", "5,8"],
                slice_varnames=["table", "table"],
                remote_varnames=['table', 'table'],
                is_sparse=False,
                endpoints=emaps,
                file_path=model_file,
            )

            remote_recv_op.run(scope, place)

    def _load_slice_var(self, model_file):
        load_prog = base.Program()
        load_block = load_prog.global_block()

        origin = load_block.create_var(
            name="var.origin",
            type=base.core.VarDesc.VarType.DENSE_TENSOR,
            shape=[10, 8],
            dtype="float32",
            persistable=True,
        )

        slice0 = load_block.create_var(
            name="var.slice0",
            type=base.core.VarDesc.VarType.DENSE_TENSOR,
            shape=[3, 8],
            dtype="float32",
            persistable=True,
        )

        slice1 = load_block.create_var(
            name="var.slice1",
            type=base.core.VarDesc.VarType.DENSE_TENSOR,
            shape=[5, 8],
            dtype="float32",
            persistable=True,
        )

        load_block.append_op(
            type='load',
            inputs={},
            outputs={'Out': [origin]},
            attrs={'file_path': model_file},
        )

        load_block.append_op(
            type='load',
            inputs={},
            outputs={'Out': [slice0]},
            attrs={
                'file_path': model_file,
                'seek': 2 * 8,
                'shape': slice0.shape,
            },
        )

        load_block.append_op(
            type='load',
            inputs={},
            outputs={'Out': [slice1]},
            attrs={
                'file_path': model_file,
                'seek': 5 * 8,
                'shape': slice1.shape,
            },
        )

        exe = base.Executor(place=base.CPUPlace())
        exe.run(load_prog)

        origin_var = base.global_scope().find_var("var.origin")
        slice0_var = base.global_scope().find_var("var.slice0")
        slice1_var = base.global_scope().find_var("var.slice1")

        origin = np.array(origin_var.get_tensor())
        slice0 = np.array(slice0_var.get_tensor())
        slice1 = np.array(slice1_var.get_tensor())

        np.testing.assert_equal(origin[2:5], slice0)
        np.testing.assert_equal(origin[5:10], slice1)

    def _save_by_io_persistables(self, place, port0, port1, dirname, var_name):
        self._run_nce_op_two_pserver(
            place, port0, port1, os.path.join(dirname, var_name)
        )

    def test_recv_save_op_remote(self):
        # run pserver on CPU in sync mode
        p0 = self._start_pserver(0, run_pserver)
        self._wait_ps_ready(p0.pid)
        port0 = self._get_pserver_port(p0.pid)

        p1 = self._start_pserver(1, run_pserver)
        self._wait_ps_ready(p1.pid)
        port1 = self._get_pserver_port(p1.pid)

        places = [core.CPUPlace()]

        param_dir = "./model_for_test_recv_save_op/"
        param_name = "table"

        for place in places:
            self._save_by_io_persistables(
                place, port0, port1, param_dir, param_name
            )

        # raise SIGTERM to pserver
        os.kill(p0.pid, signal.SIGINT)
        p0.join()
        os.kill(p1.pid, signal.SIGINT)
        p1.join()

        self._load_slice_var(param_dir + param_name)
        shutil.rmtree(param_dir)


if __name__ == '__main__':
    unittest.main()
